[SPARK-23821][SQL] Collection function: flatten#20938
[SPARK-23821][SQL] Collection function: flatten#20938mn-mikke wants to merge 20 commits intoapache:masterfrom
Conversation
python/pyspark/sql/functions.py
Outdated
|
|
||
| :param col: name of column or expression | ||
|
|
||
| >>> df = spark.createDataFrame([([[1, 2, 3], [4, 5], [6]],),([None, [4, 5]],)], ['data']) |
|
Thanks for your contribution! Try to improve your test cases by reading the other open source code (e.g., this)? |
|
ok to test |
|
Test build #88758 has finished for PR 20938 at commit
|
|
Rewrote test cases. @gatorsmile Please let me know if it's OK. |
|
Test build #88836 has finished for PR 20938 at commit
|
|
retest this please |
|
Test build #88842 has finished for PR 20938 at commit
|
|
Any other comments? |
| if ( | ||
| ArrayType.acceptsType(child.dataType) && | ||
| ArrayType.acceptsType(child.dataType.asInstanceOf[ArrayType].elementType) | ||
| ) { |
There was a problem hiding this comment.
How about this?
child.dataType match {
case _: ArrayType(_: ArrayType, _) =>
TypeCheckResult.TypeCheckSuccess
case _: =>
TypeCheckResult.TypeCheckFailure(
"The argument should be an array of arrays, " +
s"but '${child.sql}' is of ${child.dataType.simpleString} type.")
}| override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
| nullSafeCodeGen(ctx, ev, c => { | ||
| val code = | ||
| if (CodeGenerator.isPrimitiveType(dataType.elementType)) { |
There was a problem hiding this comment.
very tiny nit: shall we move this line up?
val code = if (Code...
| |if(!${ev.isNull}) { | ||
| | $coreLogic | ||
| |} | ||
| """.stripMargin |
| * @group collection_funcs | ||
| * @since 2.4.0 | ||
| */ | ||
| def flatten(e: Column): Column = withExpr{ Flatten(e.expr) } |
| checkAnswer( | ||
| oneRowDF.selectExpr("flatten(array(arr, array(null, 5), array(6, null)))"), | ||
| Seq(Row(Seq(1, 2, 3, null, 5, 6, null))) | ||
| ) |
| Examples: | ||
| > SELECT _FUNC_(array(array(1, 2), array(3, 4)); | ||
| [1,2,3,4] | ||
| """) |
| val elements = array.asInstanceOf[ArrayData].toObjectArray(dataType) | ||
|
|
||
| if (elements.contains(null)) { | ||
| null |
There was a problem hiding this comment.
does this mean if input array has null in the elements, return null ignoring other elements when we are not in codegen?
There was a problem hiding this comment.
Yes, you are right. The function also behaves the same way when codegen is applied. See test cases with a null array in CollectionExpressionsSuite.
We can discuss whether the function should behave the same way as in Presto and just ignore null elements... But I think that the current approach fits more into the semantics of Spark functions.
concat("a",null,"c") => null
1 + null => null
...
|
Test build #89055 has finished for PR 20938 at commit
|
|
Can't reproduce it locally and seems to unrelated... |
|
retest this please |
|
Test build #89093 has finished for PR 20938 at commit
|
|
retest this please |
|
Test build #89104 has finished for PR 20938 at commit
|
|
Any idea why those tests are failing? |
|
cc @ueshin |
|
Test build #89119 has finished for PR 20938 at commit
|
|
Test build #89303 has finished for PR 20938 at commit
|
| coreLogic: String): String = { | ||
| s""" | ||
| |for(int z=0; z < $childVariableName.numElements(); z++) { | ||
| | ${ev.isNull} |= $childVariableName.isNullAt(z); |
There was a problem hiding this comment.
How about breaking when null is found?
| since = "2.4.0") | ||
| case class Flatten(child: Expression) extends UnaryExpression { | ||
|
|
||
| override def nullable: Boolean = child.nullable || dataType.containsNull |
There was a problem hiding this comment.
child.nullable || child.dataType.asInstanceOf[ArrayType].containsNull?
| val code = if (CodeGenerator.isPrimitiveType(dataType.elementType)) { | ||
| genCodeForConcatOfPrimitiveElements(ctx, c, ev.value) | ||
| } else { | ||
| genCodeForConcatOfComplexElements(ctx, c, ev.value) |
There was a problem hiding this comment.
I'm wondering if we say "complex" for non-primitive types?
| override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { | ||
| nullSafeCodeGen(ctx, ev, c => { | ||
| val code = if (CodeGenerator.isPrimitiveType(dataType.elementType)) { | ||
| genCodeForConcatOfPrimitiveElements(ctx, c, ev.value) |
| if (elements.contains(null)) { | ||
| null | ||
| } else { | ||
| val flattened = elements.flatMap( |
There was a problem hiding this comment.
Do we need size check whether the total number of array elements is less than or equal to possible max array size?
There was a problem hiding this comment.
I agree, especially in combination with this comment, provided that the resulted array length is known in advance. flatMap can then be replaced with a simple loop copying chanks of data into a preallocated array.
There was a problem hiding this comment.
I've been searching for a well-defined constant indicating the VM limit for array size. It seems that the limit is platform-dependent... Any idea how to get the limit for a given platform?
| s""" | ||
| |$numElemCode | ||
| |$unsafeArraySizeInBytes | ||
| |byte[] $arrayName = new byte[$arraySizeName]; |
There was a problem hiding this comment.
Do we need size check whether the total number of array elements is less than or equal to possible max array size?
If we could use long[], we can accept more array elements.
There was a problem hiding this comment.
You made a really good point about checking the total number of array elements!
Re: long[] - It seems that UnsafeArrayData is not currently ready for that. It would require a bigger refactoring... In theory, we could push limits even further. If implemented UnsafeArrayData in a similar way like Scala Vectors but with leaves represented as byte[MAX_SIZE], the only limits would be the heap size and computing power. But is there any real case scenario where we needed to store more than 2GB into one record?
There was a problem hiding this comment.
Sorry for late comment. I think that it is fine to use byte[] for now. It is just a possibly choice to use long[].
|
Test build #89518 has finished for PR 20938 at commit
|
|
retest this please |
|
Test build #89559 has finished for PR 20938 at commit
|
|
Jenkins, retest this please. |
|
Test build #89574 has finished for PR 20938 at commit
|
|
Test build #89575 has finished for PR 20938 at commit
|
|
Test build #89582 has finished for PR 20938 at commit
|
|
LGTM pending Jenkins. |
|
Test build #89620 has finished for PR 20938 at commit
|
|
Jenkins, retest this please. |
|
Test build #89631 has finished for PR 20938 at commit
|
|
retest this please |
|
Test build #89658 has finished for PR 20938 at commit
|
|
@ueshin you forget to merge this? ;) |
|
retest this please |
|
Test build #89775 has finished for PR 20938 at commit
|
|
retest this please |
|
Test build #89783 has finished for PR 20938 at commit
|
|
retest this please |
|
Test build #89788 has finished for PR 20938 at commit
|
|
I'm sorry for the delay. |
What changes were proposed in this pull request?
This PR adds a new collection function that transforms an array of arrays into a single array. The PR comprises:
How was this patch tested?
New tests added into:
Codegen examples
Primitive type
Result:
Non-primitive type
Result: